/**
 * 
 */
package com.ws.framework.remoteservice.core.protocal;

import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.common.collect.Maps;
import com.ws.framework.remoteservice.core.consumer.SyncFuture;
import com.ws.framework.remoteservice.core.ex.ChannelInvalidException;
import com.ws.framework.remoteservice.core.ex.WRSIException;
import com.ws.framework.remoteservice.core.model.Invocation;
import com.ws.framework.remoteservice.core.model.Ping;
import com.ws.framework.remoteservice.core.model.Pong;
import com.ws.framework.remoteservice.core.model.ProviderInfo;
import com.ws.framework.remoteservice.core.model.Result;
import com.ws.framework.remoteservice.core.util.WrsiConstants;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;

/**
 * @author WSH
 *
 */
public class Docker {

	private Logger log = Logger.getLogger(this.getClass().getName());

	private static final NioEventLoopGroup workerGroup = new NioEventLoopGroup();

	private static final ConcurrentMap<String, SyncFuture<Result>> futures = Maps.newConcurrentMap();

	private Channel channel;

	private ProviderInfo info;
	
	private volatile boolean destroyed = false;
	
	private volatile boolean alive = true;

	public Docker(ProviderInfo info) {
		this.info = info;
		start();
	}

	private static final int delay = 3000;
	private static Timer timer;

	static {
		Timer timer = new Timer();
		timer.schedule(new TimerTask() {
			@Override
			public void run() {

				for (Iterator<Entry<String, SyncFuture<Result>>> iterator = futures.entrySet().iterator(); iterator
						.hasNext();) {
					Entry<String, SyncFuture<Result>> entry = iterator.next();
					if (System.currentTimeMillis()
							- entry.getValue().getBeginTime() > (WrsiConstants.DEFAULT_INVOKE_TIMEOUT_MS + delay)) {
						futures.remove(entry.getKey());
					}
				}
			}
		}, 3000, 3000);
	}

	public void start() {
		if (null == info)
			throw new WRSIException("Can't init tcp client, provider info is null");
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_BACKLOG, 20000)
				.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
					@Override
					public void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline().addLast(new TransportOut(), new TransportIn(), new BasicMessageDecoderHandler());
					}
				});
		try {
			channel = bootstrap.connect(info.getHost(), info.getPort()).sync().channel();
		} catch (InterruptedException e) {
			// ignore
		}
	}

	public synchronized void sendUtilSucc(final Invocation obj, final SyncFuture<Result> syncFuture) {
		final CountDownLatch countDownLatch = new CountDownLatch(1);
		futures.putIfAbsent(obj.getId(), syncFuture);
		if (null != channel && channel.isActive()) {
			channel.writeAndFlush(obj).addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture future) throws Exception {
					if (future.isSuccess()) {
						log.info("send succ.. id: " + obj.getId());
					} else {
						log.info("send failed..cause :" + future.cause());
						Result result = new Result(obj.getId());
						result.setSuccess(false);
						result.setThrowable(new ChannelInvalidException(future.cause()));
						syncFuture.setResponse(result);
					}
					countDownLatch.countDown();
				}
			});
		} else {
			Result result = new Result(obj.getId());
			result.setSuccess(false);
			result.setThrowable(new ChannelInvalidException("Channel is not available"));
			syncFuture.setResponse(result);
			countDownLatch.countDown();
		}
		try {
			countDownLatch.await();
		} catch (InterruptedException e) {
			// ignore;
		}
	}

	public void destroy() {
		workerGroup.shutdownGracefully();
		timer.cancel();
		futures.clear();
		destroyed = true;
	}

	public class BasicMessageDecoderHandler extends MessageToMessageDecoder<Object> {

		@Override
		protected void decode(ChannelHandlerContext ctx, Object obj, List<Object> out) throws Exception {
			if (obj instanceof Result) {
				Result msg = (Result)obj;
				log.log(Level.INFO, "收到请求返回 id: " + msg.getId()  + ", value: " + msg.getValue());
				SyncFuture<Result> future = futures.remove(msg.getId());
				if (future != null) {
					future.setResponse(msg);
					log.info(
							"本地调用共耗时 : " + (System.currentTimeMillis() - future.getBeginTime()) + " , 消息Id:" + msg.getId());
				}
			} if (obj instanceof Pong) {
				log.info("收到了 心跳回应...");
				setAlive(true);
			}
		}
		
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        	 //如果已经被销毁，则忽略重连
            if (!destroyed) {
                log.info(this.toString() + " disconnected... will try to reconnect in 5 sec...");
                setAlive(false);
                final EventLoop loop = ctx.channel().eventLoop();
                loop.schedule(new Runnable() {
                    public void run() {
                        log.info("Reconnecting to " + this.toString() + " ...");
                        start();
                    }
                }, 5, TimeUnit.SECONDS);
            }
        }
	}

	public ProviderInfo getInfo() {
		return this.info;
	}

	public void realse(String id) {
		futures.remove(id);
	}
	
	public void ping() {
		if (null != channel && channel.isActive()) {
			channel.writeAndFlush(Ping.instance);
		}
	}
	
	public boolean isAlive() {
		return alive && channel.isActive();
	}

	public void setAlive(boolean alive) {
		this.alive = alive;
	}
	
	@Override
	public boolean equals(Object o) {
		return o.toString().equals(this.getInfo().toString());
	}
}
